{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "tags": []
   },
   "source": [
    "# 🪙 G-Research Crypto Kubeflow Pipeline\n",
    "![](./images/vector-blockchain-poster.jpg)\n",
    "\n",
    "---\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "tags": []
   },
   "source": [
    "In this [Kaggle competition](https://www.kaggle.com/competitions/g-research-crypto-forecasting/overview), you'll use your machine learning expertise to forecast short term returns in 14 popular cryptocurrencies. The dataset provided contains information on historic trades for several cryptoassets, such as Bitcoin and Ethereum. \n",
    "\n",
    "> G-Research is a leading quantitative research and technology company. By using the latest scientific techniques, they produce world-beating predictive research and build advanced technology to analyse the world's data."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Install relevant libraries\n",
    "\n",
    "\n",
    ">Update pip `pip install --user --upgrade pip`\n",
    "\n",
    ">Install and upgrade kubeflow sdk `pip install kfp --upgrade --user --quiet`\n",
    "\n",
    "You may need to restart your notebook kernel after installing the kfp sdk"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already satisfied: pip in /usr/local/lib/python3.6/dist-packages (21.3.1)\n"
     ]
    }
   ],
   "source": [
    "!pip install --user --upgrade pip"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install kfp --upgrade --user --quiet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Name: kfp\n",
      "Version: 1.8.11\n",
      "Summary: KubeFlow Pipelines SDK\n",
      "Home-page: https://github.com/kubeflow/pipelines\n",
      "Author: The Kubeflow Authors\n",
      "Author-email: \n",
      "License: UNKNOWN\n",
      "Location: /home/jovyan/.local/lib/python3.6/site-packages\n",
      "Requires: absl-py, click, cloudpickle, dataclasses, Deprecated, docstring-parser, fire, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, typing-extensions, uritemplate\n",
      "Required-by: kubeflow-kale\n"
     ]
    }
   ],
   "source": [
    "# confirm the kfp sdk\n",
    "! pip show kfp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "tags": [
     "imports"
    ]
   },
   "outputs": [],
   "source": [
    "import kfp\n",
    "import kfp.components as comp\n",
    "import kfp.dsl as dsl\n",
    "from kfp.components import OutputPath\n",
    "from typing import NamedTuple"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "tags": []
   },
   "source": [
    "# Kubeflow pipeline component creation\n",
    "\n",
    "## Download the dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "# download data step\n",
    "def download_data(dataset, \n",
    "                  data_path):\n",
    "        \n",
    "    # install the necessary libraries\n",
    "    import os, sys, subprocess, zipfile, pickle;\n",
    "    subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','kaggle'])\n",
    "    \n",
    "    # import libraries\n",
    "    import pandas as pd\n",
    "\n",
    "    # setup kaggle environment for data download\n",
    "    with open('/secret/kaggle-secret/password', 'r') as file:\n",
    "        kaggle_key = file.read().rstrip()\n",
    "    with open('/secret/kaggle-secret/username', 'r') as file:\n",
    "        kaggle_user = file.read().rstrip()\n",
    "        \n",
    "    os.environ['KAGGLE_USERNAME'], os.environ['KAGGLE_KEY'] = kaggle_user, kaggle_key\n",
    "    \n",
    "    # create data_path directory\n",
    "    if not os.path.exists(data_path):\n",
    "        os.makedirs(data_path)\n",
    "    \n",
    "    # download kaggle's g-research-crypto-forecasting data\n",
    "    subprocess.run([\"kaggle\",\"competitions\", \"download\", \"-c\", dataset])\n",
    "    \n",
    "    # extract 'train.csv' and 'asset_details.csv' in g-research-crypto-forecasting.zip to data_path\n",
    "    with zipfile.ZipFile(f\"{dataset}.zip\",\"r\") as zip_ref:\n",
    "        zip_ref.extractall(data_path, members=['train.csv', 'asset_details.csv'])\n",
    "    \n",
    "    return(print('Done!'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "tags": []
   },
   "source": [
    "## Load Data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "# load data step\n",
    "def load_data(data_path):\n",
    "        \n",
    "    # install the necessary libraries\n",
    "    import os, sys, subprocess, pickle;\n",
    "    subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
    "    \n",
    "    # import libraries\n",
    "    import pandas as pd\n",
    "\n",
    "    TRAIN_CSV = f'{data_path}/train.csv'\n",
    "    ASSET_DETAILS_CSV = f'{data_path}/asset_details.csv'\n",
    "    \n",
    "    # read TRAIN_CSV and ASSET_DETAILS_CSV\n",
    "    df_train = pd.read_csv(TRAIN_CSV)\n",
    "    df_asset_details = pd.read_csv(ASSET_DETAILS_CSV).sort_values(\"Asset_ID\")\n",
    "    \n",
    "    df_train['datetime'] = pd.to_datetime(df_train['timestamp'], unit='s')\n",
    "    df_train = df_train[df_train['datetime'] >= '2020-01-01 00:00:00'].copy()\n",
    "    \n",
    "    # Save the df_train data as a pickle file to be used by the feature_engineering component.\n",
    "    with open(f'{data_path}/df_train', 'wb') as f:\n",
    "        pickle.dump(df_train, f)\n",
    "        \n",
    "    # Save the df_train data as a pickle file to be used by the merge_data component.\n",
    "    with open(f'{data_path}/df_asset_details', 'wb') as g:\n",
    "        pickle.dump(df_asset_details, g)\n",
    "\n",
    "    \n",
    "    return(print('Done!'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "tags": []
   },
   "source": [
    "## Feature Engineering"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "# feature engineering step\n",
    "\n",
    "def feature_engineering(data_path):\n",
    "    \n",
    "    # install the necessary libraries\n",
    "    import sys, subprocess;\n",
    "    subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','tqdm'])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','talib-binary'])\n",
    "    \n",
    "    # import Library\n",
    "    import os, pickle, time, talib, datetime;\n",
    "    import numpy as np\n",
    "    import pandas as pd\n",
    "    from tqdm import tqdm\n",
    "\n",
    "    # loading the df_train data\n",
    "    with open(f'{data_path}/df_train', 'rb') as f:\n",
    "        df_train = pickle.load(f)\n",
    "    \n",
    "    # creating technical indicators\n",
    "    \n",
    "    # Create a function to calculate the Relative Strength Index\n",
    "    def RSI(df, n):\n",
    "        return talib.RSI(df['Close'], n)\n",
    "    \n",
    "    # Create a function to calculate the Average True Range\n",
    "    def ATR(df, n):\n",
    "        return talib.ATR(df[\"High\"], df.Low, df.Close, n)\n",
    "\n",
    "    # Create a function to calculate the Double Exponential Moving Average (DEMA)\n",
    "    def DEMA(data, time_period):\n",
    "        #Calculate the Exponential Moving Average for some time_period (in days)\n",
    "        EMA = data['Close'].ewm(span=time_period, adjust=False).mean()\n",
    "        #Calculate the DEMA\n",
    "        DEMA = 2*EMA - EMA.ewm(span=time_period, adjust=False).mean()\n",
    "        return DEMA\n",
    "    \n",
    "    # Create a function to calculate the upper_shadow\n",
    "    def upper_shadow(df):\n",
    "        return df['High'] - np.maximum(df['Close'], df['Open'])\n",
    "    \n",
    "    # Create a function to calculate the lower_shadow\n",
    "    def lower_shadow(df):\n",
    "        return np.minimum(df['Close'], df['Open']) - df['Low']\n",
    "    \n",
    "    \n",
    "    def get_features(df, asset_id, train=True):\n",
    "        '''\n",
    "        This function takes a dataframe with all asset data and return the lagged features for a single asset.\n",
    "\n",
    "        df - Full dataframe with all assets included\n",
    "        asset_id - integer from 0-13 inclusive to represent a cryptocurrency asset\n",
    "        train - True - you are training your model\n",
    "              - False - you are submitting your model via api\n",
    "        '''\n",
    "        # filter based on asset id\n",
    "        df = df[df['Asset_ID']==asset_id]\n",
    "\n",
    "        # sort based on time stamp\n",
    "        df = df.sort_values('timestamp')\n",
    "\n",
    "        if train == True:\n",
    "            df_feat = df.copy()\n",
    "\n",
    "            # define a train_flg column to split your data into train and validation\n",
    "            totimestamp = lambda s: np.int32(time.mktime(datetime.datetime.strptime(s, \"%d/%m/%Y\").timetuple()))\n",
    "            valid_window = [totimestamp(\"01/05/2021\")]\n",
    "\n",
    "            df_feat['train_flg'] = np.where(df_feat['timestamp']>=valid_window[0], 0,1)\n",
    "            df_feat = df_feat[['timestamp','Asset_ID', 'High', 'Low', 'Open', 'Close', 'Volume','Target','train_flg']].copy()\n",
    "        else:\n",
    "            df = df.sort_values('row_id')\n",
    "            df_feat = df[['Asset_ID', 'High', 'Low', 'Open', 'Close', 'Volume','row_id']].copy()\n",
    "\n",
    "        for i in tqdm([30, 120, 240]):\n",
    "            # Applyin technical indicators\n",
    "            df_feat[f'RSI_{i}'] = RSI(df_feat, i)\n",
    "            df_feat[f'ATR_{i}'] = ATR(df_feat, i)\n",
    "            df_feat[f'DEMA_{i}'] = DEMA(df_feat, i)\n",
    "\n",
    "        for i in tqdm([30, 120, 240]):\n",
    "            # creating lag features\n",
    "            df_feat[f'sma_{i}'] = df_feat['Close'].rolling(i).mean()/df_feat['Close'] -1\n",
    "            df_feat[f'return_{i}'] = df_feat['Close']/df_feat['Close'].shift(i) -1\n",
    "\n",
    "        # new features\n",
    "        df_feat['HL'] = np.log(df_feat['High'] - df_feat['Low'])\n",
    "        df_feat['OC'] = np.log(df_feat['Close'] - df_feat['Open'])\n",
    "        \n",
    "        # Applyin lower_shadow and upper_shadow indicators\n",
    "        df_feat['lower_shadow'] = np.log(lower_shadow(df)) \n",
    "        df_feat['upper_shadow'] = np.log(upper_shadow(df))\n",
    "\n",
    "        # replace inf with nan\n",
    "        df_feat.replace([np.inf, -np.inf], np.nan, inplace=True)\n",
    "\n",
    "        # datetime features\n",
    "        df_feat['Date'] = pd.to_datetime(df_feat['timestamp'], unit='s')\n",
    "        df_feat['Day'] = df_feat['Date'].dt.weekday.astype(np.int32)\n",
    "        df_feat[\"dayofyear\"] = df_feat['Date'].dt.dayofyear\n",
    "        df_feat[\"weekofyear\"] = df_feat['Date'].dt.weekofyear\n",
    "        df_feat[\"season\"] = ((df_feat['Date'].dt.month)%12 + 3)//3\n",
    "        \n",
    "        # drop features\n",
    "        df_feat = df_feat.drop(['Open','Close','High','Low', 'Volume', 'Date'], axis=1)\n",
    "\n",
    "        # fill nan values with 0\n",
    "        df_feat = df_feat.fillna(0)\n",
    "\n",
    "        return df_feat\n",
    "    \n",
    "    # create your features dataframe for each asset and concatenate\n",
    "    feature_df = pd.DataFrame()\n",
    "    for i in range(14):\n",
    "        print(i)\n",
    "        feature_df = pd.concat([feature_df,get_features(df_train,i,train=True)])\n",
    "      \n",
    "    # save the feature engineered data as a pickle file to be used by the modeling component.\n",
    "    with open(f'{data_path}/feature_df', 'wb') as f:\n",
    "        pickle.dump(feature_df, f)\n",
    "    \n",
    "    return(print('Done!'))  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "tags": []
   },
   "source": [
    "## Merge Assets Data and Features"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "# merge_assets_features step\n",
    "\n",
    "def merge_assets_features(data_path):\n",
    "    \n",
    "    # install the necessary libraries\n",
    "    import sys, subprocess;\n",
    "    subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
    "    \n",
    "    # import Library\n",
    "    import os, pickle;\n",
    "    import pandas as pd\n",
    "\n",
    "    #loading the feature_df data\n",
    "    with open(f'{data_path}/feature_df', 'rb') as f:\n",
    "        feature_df = pickle.load(f)\n",
    "        \n",
    "    #loading the df_asset_details data\n",
    "    with open(f'{data_path}/df_asset_details', 'rb') as g:\n",
    "        df_asset_details = pickle.load(g)\n",
    "    \n",
    "    # assign weight column feature dataframe\n",
    "    feature_df = pd.merge(feature_df, df_asset_details[['Asset_ID','Weight']], how='left', on=['Asset_ID'])\n",
    "\n",
    "    #Save the feature_df as a pickle file to be used by the modelling component.\n",
    "    with open(f'{data_path}/merge_feature_df', 'wb') as h:\n",
    "        pickle.dump(feature_df, h)\n",
    "        \n",
    "    return(print('Done!'))  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "papermill": {
     "duration": 0.01421,
     "end_time": "2022-04-17T07:17:13.396620",
     "exception": false,
     "start_time": "2022-04-17T07:17:13.382410",
     "status": "completed"
    },
    "tags": []
   },
   "source": [
    "## Modelling\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "# modeling step\n",
    "\n",
    "def modeling(data_path):\n",
    "    \n",
    "    # install the necessary libraries\n",
    "    import sys, subprocess;\n",
    "    subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','lightgbm'])\n",
    "    \n",
    "    # import Library\n",
    "    import os, pickle, joblib;\n",
    "    import pandas as pd\n",
    "    import numpy as np\n",
    "    import lightgbm as lgb\n",
    "    from lightgbm import LGBMRegressor\n",
    "\n",
    "    #loading the new_feats data\n",
    "    with open(f'{data_path}/merge_feature_df', 'rb') as f:\n",
    "        feature_df = pickle.load(f)\n",
    "        \n",
    "    # define features for LGBM\n",
    "    features = ['Asset_ID', 'RSI_30', 'ATR_30',\n",
    "           'DEMA_30', 'RSI_120', 'ATR_120', 'DEMA_120', 'RSI_240', 'ATR_240',\n",
    "           'DEMA_240', 'sma_30', 'return_30', 'sma_120', 'return_120', 'sma_240',\n",
    "           'return_240', 'HL', 'OC', 'lower_shadow', 'upper_shadow', 'Day',\n",
    "           'dayofyear', 'weekofyear', 'season']\n",
    "    categoricals = ['Asset_ID']\n",
    "    \n",
    "    # define the evaluation metric\n",
    "    def weighted_correlation(a, train_data):\n",
    "\n",
    "        weights = train_data.add_w.values.flatten()\n",
    "        b = train_data.get_label()\n",
    "\n",
    "\n",
    "        w = np.ravel(weights)\n",
    "        a = np.ravel(a)\n",
    "        b = np.ravel(b)\n",
    "\n",
    "        sum_w = np.sum(w)\n",
    "        mean_a = np.sum(a * w) / sum_w\n",
    "        mean_b = np.sum(b * w) / sum_w\n",
    "        var_a = np.sum(w * np.square(a - mean_a)) / sum_w\n",
    "        var_b = np.sum(w * np.square(b - mean_b)) / sum_w\n",
    "\n",
    "        cov = np.sum((a * b * w)) / np.sum(w) - mean_a * mean_b\n",
    "        corr = cov / np.sqrt(var_a * var_b)\n",
    "\n",
    "        return 'eval_wcorr', corr, True\n",
    "    \n",
    "    # define train and validation weights and datasets\n",
    "    weights_train = feature_df.query('train_flg == 1')[['Weight']]\n",
    "    weights_test = feature_df.query('train_flg == 0')[['Weight']]\n",
    "\n",
    "    train_dataset = lgb.Dataset(feature_df.query('train_flg == 1')[features], \n",
    "                                feature_df.query('train_flg == 1')['Target'].values, \n",
    "                                feature_name = features,\n",
    "                               categorical_feature= categoricals)\n",
    "    val_dataset = lgb.Dataset(feature_df.query('train_flg == 0')[features], \n",
    "                              feature_df.query('train_flg == 0')['Target'].values, \n",
    "                              feature_name = features,\n",
    "                             categorical_feature= categoricals)\n",
    "    # add weights\n",
    "    train_dataset.add_w = weights_train\n",
    "    val_dataset.add_w = weights_test\n",
    "    \n",
    "    # LGBM params\n",
    "    evals_result = {}\n",
    "    params = {'n_estimators': 1200,\n",
    "            'objective': 'regression',\n",
    "            'metric': 'rmse',\n",
    "            'boosting_type': 'gbdt',\n",
    "            'max_depth': -1, \n",
    "            'learning_rate': 0.01,\n",
    "            'seed': 2022,\n",
    "            'verbose': -1,\n",
    "            }\n",
    "\n",
    "    # train LGBM\n",
    "    model = lgb.train(params = params,\n",
    "                      train_set = train_dataset, \n",
    "                      valid_sets = [val_dataset],\n",
    "                      early_stopping_rounds=60,\n",
    "                      verbose_eval = 30,\n",
    "                      feval=weighted_correlation,\n",
    "                      evals_result = evals_result \n",
    "                     )\n",
    "    \n",
    "    # saving model\n",
    "    joblib.dump(model, f'{data_path}/lgb.jl')\n",
    "        \n",
    "    return(print('Done!'))  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "papermill": {
     "duration": 0.01428,
     "end_time": "2022-04-17T07:17:23.959655",
     "exception": false,
     "start_time": "2022-04-17T07:17:23.945375",
     "status": "completed"
    },
    "tags": []
   },
   "source": [
    "## Evaluation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "# evaluation step\n",
    "\n",
    "def evaluation_result(data_path, \n",
    "                metrics_path: OutputPath(str)) -> NamedTuple(\"EvaluationOutput\", [(\"mlpipeline_metrics\", \"Metrics\")]):\n",
    "    \n",
    "    # import Library\n",
    "    import sys, subprocess;\n",
    "    subprocess.run([\"python\", \"-m\", \"pip\", \"install\", \"--upgrade\", \"pip\"])\n",
    "    subprocess.run([sys.executable, '-m', 'pip', 'install','lightgbm'])\n",
    "    import json;\n",
    "    from collections import namedtuple\n",
    "    import joblib\n",
    "    import lightgbm as lgb\n",
    "    from lightgbm import LGBMRegressor\n",
    "    \n",
    "    # load model\n",
    "    model = joblib.load(f'{data_path}/lgb.jl')\n",
    "\n",
    "    # model evaluation\n",
    "    root_mean_squared_error = model.best_score.get('valid_0').get('rmse')\n",
    "    weighted_correlation = model.best_score.get('valid_0').get('eval_wcorr')\n",
    "    \n",
    "    # create kubeflow metric metadata for UI    \n",
    "    metrics = {\n",
    "                'metrics': [\n",
    "                    {'name': 'root-mean-squared-error',\n",
    "                    'numberValue':  root_mean_squared_error,\n",
    "                    'format': 'RAW'},\n",
    "                    {'name': 'weighted-correlation',\n",
    "                    'numberValue':  weighted_correlation,\n",
    "                    'format': 'RAW'}\n",
    "                            ]\n",
    "              }\n",
    "    \n",
    "\n",
    "    with open(metrics_path, \"w\") as f:\n",
    "        json.dump(metrics, f)\n",
    "\n",
    "    output_tuple = namedtuple(\"EvaluationOutput\", [\"mlpipeline_metrics\"])\n",
    "\n",
    "    return output_tuple(json.dumps(metrics))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create pipeline components \n",
    "\n",
    "using `create_component_from_func`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create light weight components\n",
    "download_op = comp.create_component_from_func(download_data,base_image=\"python:3.7.1\")\n",
    "load_op = comp.create_component_from_func(load_data,base_image=\"python:3.7.1\")\n",
    "merge_assets_features_op = comp.create_component_from_func(merge_assets_features,base_image=\"python:3.7.1\")\n",
    "feature_eng_op = comp.create_component_from_func(feature_engineering,base_image=\"python:3.7.1\")\n",
    "modeling_op = comp.create_component_from_func(modeling, base_image=\"python:3.7.1\")\n",
    "evaluation_op = comp.create_component_from_func(evaluation_result, base_image=\"python:3.7.1\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Kubeflow pipeline creation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "# define pipeline\n",
    "@dsl.pipeline(name=\"g-research-crypto-forecasting-pipeline\", \n",
    "              description=\"Forecasting short term returns in 14 popular cryptocurrencies.\")\n",
    "\n",
    "# Define parameters to be fed into pipeline\n",
    "def g_research_crypto_forecast_pipeline(\n",
    "                             dataset: str,\n",
    "                             data_path: str\n",
    "                            ):\n",
    "    # Define volume to share data between components.\n",
    "    vop = dsl.VolumeOp(\n",
    "    name=\"create_data_volume\",\n",
    "    resource_name=\"data-volume\", \n",
    "    size=\"16Gi\", \n",
    "    modes=dsl.VOLUME_MODE_RWO)\n",
    "    \n",
    "    \n",
    "    # Create download container.\n",
    "    download_container = download_op(dataset, data_path)\\\n",
    "                        .add_pvolumes({data_path: vop.volume}).add_pod_label(\"kaggle-secret\", \"true\")\n",
    "    # Create load container.\n",
    "    load_container = load_op(data_path)\\\n",
    "                    .add_pvolumes({data_path: download_container.pvolume})\n",
    "    # Create feature engineering container.\n",
    "    feat_eng_container = feature_eng_op(data_path)\\\n",
    "                            .add_pvolumes({data_path: load_container.pvolume})\n",
    "    # Create merge_assets_feat container.\n",
    "    merge_assets_feat_container = merge_assets_features_op(data_path)\\\n",
    "                                 .add_pvolumes({data_path: feat_eng_container.pvolume})\n",
    "    # Create modeling container.\n",
    "    modeling_container = modeling_op(data_path)\\\n",
    "                        .add_pvolumes({data_path: merge_assets_feat_container.pvolume})\n",
    "    # Create prediction container.\n",
    "    evaluation_container = evaluation_op(data_path).add_pvolumes({data_path: modeling_container.pvolume})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create client that would enable communication with the Pipelines API server \n",
    "client = kfp.Client()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "# arguments\n",
    "dataset = \"g-research-crypto-forecasting\"\n",
    "data_path = \"/mnt\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<a href=\"/pipeline/#/experiments/details/5cfad93a-84d1-4c4e-8811-2c7c51c78214\" target=\"_blank\" >Experiment details</a>."
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/html": [
       "<a href=\"/pipeline/#/runs/details/02d52c34-9a84-4be2-9eb7-2047c72704ef\" target=\"_blank\" >Run details</a>."
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "pipeline_func = g_research_crypto_forecast_pipeline\n",
    "\n",
    "experiment_name = 'g_research_crypto_forecast_pipeline_lightweight'\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "\n",
    "arguments = {\n",
    "             \"dataset\": dataset,\n",
    "             \"data_path\": data_path\n",
    "            }\n",
    "\n",
    "# Compile pipeline to generate compressed YAML definition of the pipeline.\n",
    "kfp.compiler.Compiler().compile(pipeline_func,  \n",
    "  '{}.zip'.format(experiment_name))\n",
    "\n",
    "# Submit pipeline directly from pipeline function\n",
    "run_result = client.create_run_from_pipeline_func(pipeline_func, \n",
    "                                                  experiment_name=experiment_name, \n",
    "                                                  run_name=run_name, \n",
    "                                                  arguments=arguments\n",
    "                                                 )\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "kubeflow_notebook": {
   "autosnapshot": true,
   "experiment": {
    "id": "2efb8e27-3b2e-439b-a53c-b1f9d7b94cfc",
    "name": "g-research-crypto-forecasting"
   },
   "experiment_name": "g-research-crypto-forecasting",
   "katib_metadata": {
    "algorithm": {
     "algorithmName": "grid"
    },
    "maxFailedTrialCount": 3,
    "maxTrialCount": 12,
    "objective": {
     "objectiveMetricName": "",
     "type": "minimize"
    },
    "parallelTrialCount": 3,
    "parameters": []
   },
   "katib_run": false,
   "pipeline_description": "Forecasting short term returns in 14 popular cryptocurrencies.",
   "pipeline_name": "g-research-crypto-forecasting-pipeline",
   "snapshot_volumes": true,
   "steps_defaults": [
    "label:access-ml-pipeline:true",
    "label:kaggle-secret:true",
    "label:access-rok:true"
   ],
   "volume_access_mode": "rwm",
   "volumes": [
    {
     "annotations": [],
     "mount_point": "/home/jovyan",
     "name": "demo-workspace-fb99v",
     "size": 15,
     "size_type": "Gi",
     "snapshot": false,
     "type": "clone"
    }
   ]
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  },
  "papermill": {
   "default_parameters": {},
   "duration": 32.012084,
   "end_time": "2022-04-17T07:17:25.053666",
   "environment_variables": {},
   "exception": null,
   "input_path": "__notebook__.ipynb",
   "output_path": "__notebook__.ipynb",
   "parameters": {},
   "start_time": "2022-04-17T07:16:53.041582",
   "version": "2.3.4"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
